package com.kaigejava.rocketmq.maindemo.consumer.base;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author 凯哥Java
 * @description 消息消费者
 * @company
 * @since 2022/10/19 8:47
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //：创建消费者consumer,指定消费者组名
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("group1");
        //2：指定nameserver地址
        pushConsumer.setNamesrvAddr("192.168.50.132:9876");
        //3：订阅主题Topic和Tag
        pushConsumer.subscribe("base-async-topic", "");
        //设置消费模式：负载均衡还是广播模式
        //负载均衡模式
        //pushConsumer.setMessageModel(MessageModel.CLUSTERING);
        //广播模式
        // pushConsumer.setMessageModel(MessageModel.BROADCASTING);
        //4：设置回调函数，处理消息
        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list
                    , ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    String str = new String(msg.getBody());
                    System.out.println("消息为:" + str);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5：启动消费组consumer
        pushConsumer.start();
        System.out.println("消费者启动完成");
    }
}
